package com.duobaoyu.chatwebsocket.handler;

import com.alibaba.fastjson.JSONObject;
import com.duobaoyu.chatwebsocket.constant.MqTagConstant;
import com.duobaoyu.chatwebsocket.dto.*;
import com.duobaoyu.chatwebsocket.enums.MsgActionEnum;
import com.duobaoyu.chatwebsocket.feign.SystemCenterFeign;
import com.duobaoyu.chatwebsocket.netty.UserChannelRel;
import com.duobaoyu.chatwebsocket.producer.WebSocketProducer;
import com.duobaoyu.chatwebsocket.util.DingTalkUtil;
import com.duobaoyu.chatwebsocket.util.JsonUtils;
import com.duobaoyu.chatwebsocket.util.NetUtil;
import com.duobaoyu.chatwebsocket.util.SpringUtil;
import com.duobaoyu.chatwebsocket.util.UUIDUtil;
import com.duobaoyu.middleware.common.Result;
import com.duobaoyu.systemcenter.model.result.ChatUserCacheResultDto;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
 * 接收websocket消息handler
 * 配合
 *
 * @author guigu
 */
@Slf4j
@Component
public class ChatReadHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {

        // 获取客户端传输过来的消息
        String content = msg.text();
        log.info("获取客户端传输过来的消息:{}", content);

        if (StringUtils.isEmpty(content)) {
            throw new NullPointerException("不能发送空消息");
        }

        Channel currentChannel = ctx.channel();
        // 1. 获取客户端发来的消息
        WebSocketMessageDto webSocketMessageDto = JsonUtils.jsonToPojo(content, WebSocketMessageDto.class);
        if (webSocketMessageDto == null) {
            throw new NullPointerException("消息体错误");
        }
        String event = webSocketMessageDto.getEvent();
        // 2. 判断消息类型，根据不同的类型来处理不同的业务
        MsgActionEnum msgActionEnum = MsgActionEnum.getByEvent(event);
        if (msgActionEnum == null) {
            throw new NullPointerException("消息类型错误");
        }
        switch (msgActionEnum) {
            case PING:
                //直接返回当前uuid
                webSocketMessageDto.setEvent(MsgActionEnum.PONG.getEvent());
                if (currentChannel.isActive() && currentChannel.isWritable()) {
                    currentChannel.writeAndFlush(new TextWebSocketFrame(JsonUtils.objectToJson(webSocketMessageDto)));
                }
                break;
            case CONNECT:
                handleConnect(currentChannel, webSocketMessageDto);
                break;
            case DIS_CONNECT:
                //退出登录，断开链接，删除长连接缓存
                this.handlerRemoved(ctx);
                break;
            case SIGN_VOICE:
                //签收消息，仅用于签收语音，文字消息由聊天记录列表签收，一次签收所有文字消息
                this.handlerSignVoice(webSocketMessageDto);
                break;
            case CLIENT_CONNECT:
                //用户端初始化连接
                this.handleClientConnect(currentChannel, webSocketMessageDto);
                break;
            case SIGN_ALL:
                this.handlerSignAll(webSocketMessageDto);
                break;
            case CLIENT_MESSAGE:
                //来自头条用户的消息
                WebSocketMqDto webSocketUserMqDto = new WebSocketMqDto(webSocketMessageDto);
                WebSocketProducer webSocketUserProducer = SpringUtil.getBean(WebSocketProducer.class);
                webSocketUserProducer.sendMsgToChatSystem(webSocketUserMqDto, MqTagConstant.CLIENT_MSG);
                break;
            default:
                /*
                 * 其他消息类型直接发送至聊天业务服务器
                 * 断开连接也发送mq消息至业务服务器做处理
                 */
                WebSocketMqDto webSocketMqDto = new WebSocketMqDto(webSocketMessageDto);
                webSocketMqDto.setCurrentChannelId(currentChannel.id().asLongText());
                webSocketMqDto.setIp(NetUtil.getHostIp());
                WebSocketProducer webSocketProducer = SpringUtil.getBean(WebSocketProducer.class);
                //此处使用前端定义的msgId作为rocketMq的msgId
                webSocketProducer.sendMsgToChatSystem(webSocketMqDto, MqTagConstant.EMP_MSG);
        }
    }



    private void handlerSignAll(WebSocketMessageDto webSocketMessageDto) {
        WebSocketProducer webSocketProducer = SpringUtil.getBean(WebSocketProducer.class);
        String tag = MqTagConstant.SIGN_ALL;
        webSocketProducer.sign(webSocketMessageDto, tag);
    }

    private void handlerSignVoice(WebSocketMessageDto webSocketMessageDto) {
        WebSocketProducer webSocketProducer = SpringUtil.getBean(WebSocketProducer.class);
        webSocketProducer.sign(webSocketMessageDto, MqTagConstant.SIGN_VOICE);
    }


    private void handleConnect(Channel currentChannel, WebSocketMessageDto webSocketMessageDto) {

        String token = webSocketMessageDto.getData().toString();
        log.info("token:{}", token);
        SystemCenterFeign systemCenterFeign = SpringUtil.getBean(SystemCenterFeign.class);
        WebSocketResultDto webSocketResultDto;

        ChatWriteHandler chatWriteHandler = SpringUtil.getBean(ChatWriteHandler.class);
        //判断是否登录
        Result<ChatUserCacheResultDto> userBasicMessageByToken = systemCenterFeign.getChatUserBasicMessageByToken(token);
        log.info("userBasicMessageByToken:{}", userBasicMessageByToken);

        if (!userBasicMessageByToken.isSuccess()) {
            String uuid = UUIDUtil.getUUID();
            log.error("systemCenterFeign getChatUserBasicMessageByToken failed logID:{}", uuid);
            webSocketResultDto = WebSocketResultDto.error(uuid);
            chatWriteHandler.loginResultToSale(webSocketResultDto, currentChannel.id().asLongText());
            return;
        }
        ChatUserCacheResultDto chatUserCacheResultDto = userBasicMessageByToken.getData();
        log.info("chatUserCacheResultDto:{}", chatUserCacheResultDto);
        if (chatUserCacheResultDto == null) {
            String uuid = UUIDUtil.getUUID();
            log.error("systemCenterFeign getChatUserBasicMessageByToken chatUserCacheResultDto is null logID:{}", uuid);
            webSocketResultDto = WebSocketResultDto.error(uuid);
            chatWriteHandler.loginResultToSale(webSocketResultDto, currentChannel.id().asLongText());
            return;
        }
        Integer empId = chatUserCacheResultDto.getEmpId();
        if (empId == null) {
            String uuid = UUIDUtil.getUUID();
            log.error("systemCenterFeign getChatUserBasicMessageByToken empId is null logID:{}", uuid);
            webSocketResultDto = WebSocketResultDto.error(uuid);
            chatWriteHandler.loginResultToSale(webSocketResultDto, currentChannel.id().asLongText());
            return;
        }
        //返回登录成功给前端
        webSocketResultDto = WebSocketResultDto.success();
        chatWriteHandler.loginResultToSale(webSocketResultDto, currentChannel.id().asLongText());

        //发送消息给业务服务器
        String channelId = currentChannel.id().asLongText();
        String ip = NetUtil.getHostIp();
        EmpChannelMsgDto empChannelMsgDto = new EmpChannelMsgDto();
        empChannelMsgDto.setChannelId(channelId);
        empChannelMsgDto.setEmpId(empId.toString());
        empChannelMsgDto.setIp(ip);
        empChannelMsgDto.setRemoteIp(currentChannel.remoteAddress().toString());
        //登录校验成功，缓存用户channel和信息
        UserChannelRel.channelEmpManager.put(currentChannel, empChannelMsgDto);
        WebSocketProducer webSocketProducer = SpringUtil.getBean(WebSocketProducer.class);
        webSocketProducer.connect(empChannelMsgDto);
    }

    private void handleClientConnect(Channel currentChannel, WebSocketMessageDto webSocketMessageDto) {
        String token = webSocketMessageDto.getData().toString();
        log.info("头条用户的token:{}", token);
        log.info("登录信息：{}", JSONObject.toJSON(webSocketMessageDto));
        String channelId = currentChannel.id().asLongText();
        String ip = NetUtil.getHostIp();
        UserChannelMsgDto userChannelMsgDto = new UserChannelMsgDto();
        userChannelMsgDto.setChannelId(channelId);
        userChannelMsgDto.setToken(token);
        userChannelMsgDto.setIp(ip);
        WebSocketMqDto webSocketMqDto = new WebSocketMqDto();
        webSocketMqDto.setContent(userChannelMsgDto);
        webSocketMqDto.setIp(ip);
        WebSocketProducer webSocketProducer = SpringUtil.getBean(WebSocketProducer.class);
        //此时chat系统还需要校验token，所以不放入连接池
        webSocketProducer.sendUserToChatSystem(webSocketMqDto, MqTagConstant.CLIENT_CONNECT_WEBSOCKET);
    }

    /**
     * 检测心跳
     *
     * @param ctx ChannelHandlerContext
     * @param evt 事件
     * @throws Exception Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

        // 判断evt是否是IdleStateEvent（用于触发用户事件，包含 读空闲/写空闲/读写空闲 ）
        if (evt instanceof IdleStateEvent) {
            // 强制类型转换
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.ALL_IDLE) {
                log.info("channel关闭前，users的数量为：" + UserChannelRel.users.size());
                // 关闭无用的channel，以防资源浪费
                String hostIp = NetUtil.getHostIp();
                if (null == hostIp || !hostIp.equals("172.16.16.117")) {
                    this.handlerRemoved(ctx);
                }
                log.info("channel关闭后，users的数量为：" + UserChannelRel.users.size());
            }
        }

    }

    /**
     * 当客户端连接服务端之后（打开连接）
     * 获取客户端的channel，并且放到ChannelGroup中去进行管理
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        log.info("客户端新增，channelId(long) 为：{}", ctx.channel().id().asLongText());
        UserChannelRel.users.add(ctx.channel());
        UserChannelRel.channelIdWithChannel.put(ctx.channel().id().asLongText(), ctx.channel());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        String channelId = ctx.channel().id().asLongText();
        log.info("客户端被移除，channelId(long) 为：{} ,远程地址为:{}", channelId, ctx.channel().remoteAddress().toString());
        this.removeCtx(ctx.channel());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        DingTalkUtil dingTalkUtil = SpringUtil.getBean(DingTalkUtil.class);
        String uuid = UUIDUtil.getUUID();
        dingTalkUtil.notify("Websocket 长连接异常:" + "\n 日志唯一id :" + uuid + "\n 长连接id :" + ctx.channel().id().asLongText() + "\n 远程地址 :" + ctx.channel().remoteAddress().toString() + "\n exception message:" + cause.getMessage());
        log.warn("日志唯一id：" + uuid + "\n长连接异常" + "\n 长连接id :" + ctx.channel().id().asLongText() + "\n 远程地址 :" + ctx.channel().remoteAddress().toString(), cause);
        WebSocketResultDto webSocketResultDto = new WebSocketResultDto();
        webSocketResultDto.setEvent(MsgActionEnum.NEED_LOGIN.getEvent());
        webSocketResultDto.setMsg("出错了！在工单系统中，附上当前的截图、用户微信昵称、顾问姓名、出错时间或者错误描述，" + "错误id,提交反馈！错误id：" + uuid);
        ctx.writeAndFlush(new TextWebSocketFrame(JsonUtils.objectToJson(webSocketResultDto)));
        this.removeCtx(ctx.channel());
    }


    public void removeCtx(Channel channel) {
        SocketAddress remote = channel.remoteAddress();
        channel.close().addListener((ChannelFutureListener) future
                -> log.info("closeChannel: close the connection to remote address[{}] result: {}",
                remote.toString(), future.isSuccess()));
        EmpChannelMsgDto empChannelMsgDto = UserChannelRel.channelEmpManager.get(channel);
        UserChannelRel.channelIdWithChannel.remove(channel.id().asLongText());
        if (empChannelMsgDto != null) {
            //移除本地长连接信息缓存
            UserChannelRel.channelEmpManager.remove(channel);
            //发送消息至业务服务器，清除长连接信息redis缓存
            WebSocketProducer webSocketProducer = SpringUtil.getBean(WebSocketProducer.class);
            log.info("断开连接 empChannelMsgDto：{}", empChannelMsgDto);
            webSocketProducer.disconnect(empChannelMsgDto);
        }
        UserChannelMsgDto userChannelMsgDto = UserChannelRel.channelWithUser.get(channel);
        //移除用户长链接
        if (userChannelMsgDto != null) {
            Long userId = userChannelMsgDto.getUserId();
            ArrayList<Channel> channels = UserChannelRel.userIdWithChannelList.get(userId);
            if (channels.size() == 1) {
                UserChannelRel.userIdWithChannelList.remove(userId);
            } else {
                //channels.stream().filter(e->!e.equals(channel)).collect(Collectors.toList());
                Iterator<Channel> iterator = channels.iterator();
                while (iterator.hasNext()) {
                    Channel next = iterator.next();
                    if (channel == next) {
                        iterator.remove();
                    }
                }
            }

            // 移除长链接
            UserChannelRel.channelWithUser.remove(channel);
        }

    }


}
